// BSD License (http://www.galagosearch.org/license)

package org.galagosearch.tupleflow.execution;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.TreeMap;

/**
 * A Job object specifies a TupleFlow execution: the objects used, their parameters,
 * and how they communicate.  A Job can be specified in XML (and parsed with JobConstructor),
 * or in code.
 * 
 * @author trevor
 */
public class Job implements Serializable {
    public TreeMap<String, Stage> stages = new TreeMap<String, Stage>();
    public ArrayList<Connection> connections = new ArrayList<Connection>();
    public HashMap<String, ConnectionEndPoint> exports = new HashMap<String, ConnectionEndPoint>();
    public HashMap<String, String> properties = new HashMap<String, String>();

    private String orderString(String[] order) {
        StringBuilder builder = new StringBuilder();
        for (String o : order) {
            if (builder.length() > 0) {
                builder.append(" ");
            }
            builder.append(o);
        }
        return builder.toString();
    }

    /**
     * Sometimes its convenient to specify a group of stages as its own job,
     * with connections that flow between stages specified in the job.  The
     * add method allows you to add another job to this job.  To avoid name
     * conflicts, all stages in the job are renamed from <tt>stageName</tt>
     * to <tt>jobName.stageName</tt>.
     */
    public void add(String jobName, Job group) {
        assert this != group;

        for (Stage s : group.stages.values()) {
            Stage copy = s.clone();
            copy.name = jobName + "." + s.name;
            add(copy);
        }

        for (Connection c : group.connections) {
            Connection copy = c.clone();

            for (ConnectionEndPoint input : copy.inputs) {
                input.setStageName(jobName + "." + input.getStageName());
            }

            for (ConnectionEndPoint output : copy.outputs) {
                output.setStageName(jobName + "." + output.getStageName());
            }

            connections.add(copy);
        }
    }

    /**
     * Adds a stage to the current job.
     */
    public void add(Stage s) {
        stages.put(s.name, s);
    }

    Map<String, Stage> findStagesWithPrefix(String prefix) {
        Map<String, Stage> result;
        if (stages.containsKey(prefix)) {
            result = new HashMap<String, Stage>();
            result.put(prefix, stages.get(prefix));
        } else {
            result = stages.subMap(prefix + '.', prefix + ('.' + 1));
        }

        return result;
    }

    public static class StagePoint implements Comparable<StagePoint> {
        String stageName;
        String pointName;
        private StageConnectionPoint point;

        public StagePoint(String stageName, String pointName) {
            this(stageName, pointName, null);
        }

        public StagePoint(String stageName, String pointName, StageConnectionPoint point) {
            this.stageName = stageName;
            this.pointName = pointName;
            this.point = point;
        }

        public boolean equals(StagePoint other) {
            return stageName.equals(other.stageName) && pointName.equals(other.pointName);
        }

        public int compareTo(StagePoint other) {
            int result = stageName.compareTo(other.stageName);
            if (result != 0) {
                return result;
            }
            return pointName.compareTo(other.pointName);
        }

        @Override
        public int hashCode() {
            return stageName.hashCode() + pointName.hashCode() * 3;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }

            final StagePoint other = (StagePoint) obj;
            if (this.stageName != other.stageName && (this.stageName == null || !this.stageName.
                    equals(other.stageName))) {
                return false;
            }
            if (this.pointName != other.pointName && (this.pointName == null || !this.pointName.
                    equals(other.pointName))) {
                return false;
            }

            return true;
        }

        public StageConnectionPoint getPoint() {
            return point;
        }

        public void setPoint(StageConnectionPoint point) {
            this.point = point;
        }

        @Override
        public String toString() {
            return String.format("%s:%s", stageName, pointName);
        }
    }

    HashSet<StagePoint> extractStagePoints(Collection<Stage> allStages, ConnectionPointType type) {
        HashSet<StagePoint> result = new HashSet<StagePoint>();

        for (Stage s : allStages) {
            for (Map.Entry<String, StageConnectionPoint> e : s.connections.entrySet()) {
                String pointName = e.getKey();
                String stageName = s.name;

                if (e.getValue().type == type) {
                    result.add(new StagePoint(stageName, pointName, e.getValue()));
                }
            }
        }

        return result;
    }

    /**
     * Connects outputs from stage sourceName to inputs from stage
     * destinationName.
     * 
     * Connect can make connections between any stage with the name
     * sourceName, or that starts with sourceName (same goes for
     * destinationName), which makes this particularly useful for 
     * making connections between sub-jobs.
     */
    public void connect(String sourceName, String destinationName, ConnectionAssignmentType assignment) {
        // scan the stages, looking for sources
        Map<String, Stage> sources = findStagesWithPrefix(sourceName);
        Map<String, Stage> destinations = findStagesWithPrefix(destinationName);

        // find all inputs and outputs in these stages
        HashSet<StagePoint> outputs = extractStagePoints(sources.values(),
                                                         ConnectionPointType.Output);
        HashSet<StagePoint> inputs = extractStagePoints(destinations.values(),
                                                        ConnectionPointType.Input);

        // remove any inputs that are already referenced in job connections
        for (Connection c : connections) {
            for (ConnectionEndPoint p : c.outputs) {
                StagePoint point = new StagePoint(p.getStageName(), p.getPointName());
                inputs.remove(point);
            }
        }

        // now we have a list of all dangling inputs.  try to match them with outputs
        HashMap<String, ArrayList<StagePoint>> outputMap = new HashMap<String, ArrayList<StagePoint>>();
        for (StagePoint point : outputs) {
            if (!outputMap.containsKey(point.pointName)) {
                outputMap.put(point.pointName, new ArrayList<StagePoint>());
            }
            outputMap.get(point.pointName).add(point);
        }

        for (StagePoint destinationPoint : inputs) {
            if (outputMap.containsKey(destinationPoint.pointName)) {
                assert outputMap.get(destinationPoint.pointName).size() == 1;
                StagePoint sourcePoint = outputMap.get(destinationPoint.pointName).get(0);

                connect(sourcePoint, destinationPoint, assignment);
            }
        }
    }

    public void connect(StagePoint source, StagePoint destination, ConnectionAssignmentType assignment) {
        int hashCount = -1;
        String[] hashType = null;

        if (assignment != ConnectionAssignmentType.Combined) {
            hashType = source.point.getOrder();
        }
        connect(source, destination, assignment, hashType, hashCount);
    }

    public void connect(StagePoint source, StagePoint destination, ConnectionAssignmentType assignment, String[] hashType, int hashCount) {
        // first, try to find a usable connection
        Connection connection = null;

        if (source.getPoint() == null) {
            Stage sourceStage = stages.get(source.stageName);
            StageConnectionPoint sourcePoint = sourceStage.getConnection(source.pointName);
            source.point = sourcePoint;
        }

        for (Connection c : connections) {
            if (c.inputs.size() < 1) {
                continue;
            }
            ConnectionEndPoint connectionInput = c.inputs.get(0);

            if (connectionInput.getPointName().equals(source.pointName) &&
                    connectionInput.getStageName().equals(source.stageName)) {
                connection = c;
                break;
            }
        }

        // coudln't find a connection that has this input, so we'll make one
        if (connection == null) {
            connection = new Connection(null, source.getPoint().getClassName(), source.getPoint().
                                        getOrder(),
                                        hashType,
                                        hashCount);
            ConnectionEndPoint input = new ConnectionEndPoint(null,
                                                              source.stageName,
                                                              source.pointName,
                                                              ConnectionPointType.Input);
            connection.inputs.add(input);
            connections.add(connection);
        }

        ConnectionEndPoint output = new ConnectionEndPoint(null,
                                                           destination.stageName,
                                                           destination.pointName,
                                                           assignment,
                                                           ConnectionPointType.Output);
        connection.outputs.add(output);
    }

    /**
     * Returns this job as a graph in the DOT language.  This DOT text can be
     * used with GraphViz (http://www.graphviz.org) to display a picture of
     * the job.
     */
    public String toDotString() {
        StringBuilder builder = new StringBuilder();
        builder.append("digraph {\n");

        for (Connection connection : connections) {
            for (ConnectionEndPoint input : connection.inputs) {
                for (ConnectionEndPoint output : connection.outputs) {
                    String edge = String.format("  %s -> %s [label=\"%s\"];\n",
                            input.getStageName(), output.getStageName(), connection.getName());
                    builder.append(edge);
                }
            }
        }

        for (Stage stage : stages.values()) {
            builder.append(String.format("  %s;\n", stage.name));
        }

        builder.append("}\n");
        return builder.toString();
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("<job>\n");

        // Properties block
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            builder.append(String.format("    <property name=\"%s\" value=\"%s\" />\n",
                                         entry.getKey(), entry.getValue()));
        }
        builder.append("\n");

        // Connections block
        builder.append("    <connections>\n");
        for (Connection connection : connections) {
            if (connection.getHash() != null) {
                String connectionHeader = String.format(
                        "        <connection id=\"%s\"           \n" +
                        "                    class=\"%s\"        \n" +
                        "                    order=\"%s\"        \n" +
                        "                    hash=\"%s\"         \n" +
                        "                    hashCount=\"%d\">   \n",
                        connection.getName(),
                        connection.getClassName(),
                        orderString(connection.getOrder()),
                        orderString(connection.getHash()),
                        connection.getHashCount());
                builder.append(connectionHeader);
            } else {
                String connectionHeader = String.format(
                        "        <connection id=\"%s\"         \n" +
                        "                    class=\"%s\"        \n" +
                        "                    order=\"%s\">       \n",
                        connection.getName(),
                        connection.getClassName(),
                        orderString(connection.getOrder()));
                builder.append(connectionHeader);
            }

            for (ConnectionEndPoint point : connection.inputs) {
                String endPointString = String.format(
                        "            <input stage=\"%s\"         \n" +
                        "                   endpoint=\"%s\" />   \n",
                        point.getStageName(),
                        point.getPointName());
                builder.append(endPointString);
            }

            for (ConnectionEndPoint point : connection.outputs) {
                String endPointString = String.format(
                        "            <output stage=\"%s\"         \n" +
                        "                    endpoint=\"%s\"      \n" +
                        "                    assignment=\"%s\" /> \n",
                        point.getStageName(),
                        point.getPointName(),
                        point.getAssignment());
                builder.append(endPointString);
            }

            builder.append("        </connection>\n");
        }
        builder.append("    </connections>\n");
        builder.append("\n");

        // Stages block
        builder.append("    <stages>\n");
        for (Stage s : stages.values()) {
            String stageHeader =
                    String.format("        <stage id=\"%s\">\n", s.name);
            builder.append(stageHeader);

            builder.append("            <connections>\n");

            for (StageConnectionPoint point : s.connections.values()) {
                String pointString = String.format(
                        "                <%s id=\"%s\" as=\"%s\" class=\"%s\" order=\"%s\" />\n",
                        point.type, point.externalName, point.internalName, point.getClassName(),
                        orderString(point.getOrder()));
                builder.append(pointString);
            }

            builder.append("            </connections>\n");
            printSteps(builder, s.steps, "steps");

            builder.append("        </stage>\n");
        }
        builder.append("    </stages>\n");

        builder.append("</job>\n");
        return builder.toString();
    }

    private void printSteps(final StringBuilder builder, final ArrayList<Step> steps, final String tag) {
        builder.append(String.format("            <%s>\n", tag));
        for (Step step : steps) {
            if (step instanceof InputStep) {
                InputStep input = (InputStep) step;
                String line = String.format("                <input id=\"%s\" />\n", input.getId());
                builder.append(line);
            } else if (step instanceof OutputStep) {
                OutputStep output = (OutputStep) step;
                String line = String.format("                <output id=\"%s\" />\n", output.getId());
                builder.append(line);
            } else if (step instanceof MultiStep) {
                MultiStep multi = (MultiStep) step;
                builder.append("                <multi>\n");
                for (ArrayList<Step> group : multi.groups) {
                    printSteps(builder, group, "group");
                }
                builder.append("                </multi>\n");
            } else if (step.getParameters() == null || step.getParameters().isEmpty()) {
                String stepHeader = String.format("                <step class=\"%s\" />\n", step.
                                                  getClassName());
                builder.append(stepHeader);
            } else {
                String stepHeader = String.format("                <step class=\"%s\">\n", step.
                                                  getClassName());
                builder.append(stepHeader);
                String parametersString = step.getParameters().toString();

                // strip out the beginning and end parts
                int start = parametersString.indexOf("<parameters>") + "<parameters>".length();
                int end = parametersString.lastIndexOf("</parameters>");
                parametersString = parametersString.substring(start, end);

                builder.append(parametersString);
                builder.append("                </step>\n");
            }
        }
        builder.append(String.format("                </%s>\n", tag));
    }
}
    
